package com.github.bluesbruce.mqtt.config;

import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

/**
 * MQTT消费者配置
 *
 * <p>这是第二个消费者</p>
 *
 * @author BBF
 */
@Configuration
public class MqttConfigTwo {

  public static final String CHANNEL_NAME_IN2 = "mqttInboundChannel_tow";
  private static final Logger LOGGER = LoggerFactory.getLogger(MqttConfigTwo.class);

  /**
   * MQTT消费端订阅通道（消息入站）
   *
   * @return {@link org.springframework.messaging.MessageChannel}
   */
  @Bean(name = CHANNEL_NAME_IN2)
  public MessageChannel inboundChannel2() {
    return new DirectChannel();
  }


  /**
   * MQTT消费端连接配置（消息入站）
   *
   * @param channel {@link org.springframework.messaging.MessageChannel}
   * @param factory {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
   * @return {@link org.springframework.integration.core.MessageProducer}
   */
  @Bean
  public MessageProducer inbound2(
      @Qualifier(CHANNEL_NAME_IN2) MessageChannel channel,
      @Qualifier(MqttConfig.FACTORY_NAME) MqttPahoClientFactory factory) {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
        "mqttConsumer_in_bound_2", factory, "hello");
    adapter.setCompletionTimeout(3000L);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setQos(1);
    // 设置订阅通道
    adapter.setOutputChannel(channel);
    return adapter;
  }

  /**
   * MQTT消费端消息处理器（消息入站）
   *
   * @return {@link org.springframework.messaging.MessageHandler}
   */
  @Bean
  @ServiceActivator(inputChannel = CHANNEL_NAME_IN2)
  public MessageHandler inboundHandler2() {
    return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        Map<String, Object> headers = message.getHeaders();
        Object topic = headers.get("mqtt_receivedTopic");
        LOGGER.info("----MQTT第二个订阅---- top={}， getPayload={}",
            topic != null ? topic.toString() : "", message.getPayload());
      }
    };
  }
}
